#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# input-remapper - GUI for device specific keyboard mappings
# Copyright (C) 2025 sezanzeb <b8x45ygc9@mozmail.com>
#
# This file is part of input-remapper.
#
# input-remapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# input-remapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with input-remapper.  If not, see <https://www.gnu.org/licenses/>.

import json
import os
import time
import unittest
from unittest.mock import patch, MagicMock

import evdev
from evdev._ecodes import EV_ABS
from evdev.ecodes import EV_KEY, KEY_B, KEY_A, ABS_X, BTN_A, BTN_B

from inputremapper.configs.global_config import GlobalConfig
from inputremapper.configs.input_config import InputCombination, InputConfig
from inputremapper.configs.mapping import Mapping
from inputremapper.configs.paths import PathUtils
from inputremapper.configs.preset import Preset
from inputremapper.configs.keyboard_layout import keyboard_layout
from inputremapper.daemon import Daemon, DAEMON
from inputremapper.groups import groups
from inputremapper.injection.global_uinputs import GlobalUInputs, UInput
from inputremapper.injection.injector import InjectorState
from inputremapper.injection.mapping_handlers.mapping_parser import MappingParser
from inputremapper.input_event import InputEvent
from tests.lib.cleanup import cleanup
from tests.lib.fixtures import Fixture
from tests.lib.fixtures import fixtures
from tests.lib.logger import logger
from tests.lib.pipes import push_events, uinput_write_history_pipe
from tests.lib.test_setup import test_setup, is_service_running
from tests.lib.tmp import tmp


@test_setup
class TestDaemon(unittest.TestCase):
    new_fixture_path = "/dev/input/event9876"

    def setUp(self):
        self.daemon = None
        self.global_config = GlobalConfig()
        self.global_uinputs = GlobalUInputs(UInput)
        PathUtils.mkdir(PathUtils.get_config_path())
        self.global_config._save_config()
        self.mapping_parser = MappingParser(self.global_uinputs)

        # the daemon should be able to create them on demand:
        self.global_uinputs.devices = {}
        self.global_uinputs.is_service = True

    def tearDown(self):
        # avoid race conditions with other tests, daemon may run processes
        if self.daemon is not None:
            self.daemon.stop_all()
            self.daemon = None

        cleanup()

    @patch.object(os, "system")
    def test_connect(self, os_system_mock: MagicMock):
        self.assertFalse(is_service_running())

        # no daemon runs, should try to run it via pkexec instead.
        # It fails due to the patch on os.system and therefore exits the process
        self.assertRaises(SystemExit, Daemon.connect)
        os_system_mock.assert_called_once()
        self.assertIsNone(Daemon.connect(False))

        # make the connect command work this time by acting like a connection is
        # available:

        set_config_dir_callcount = 0

        class FakeConnection:
            def set_config_dir(self, *_, **__):
                nonlocal set_config_dir_callcount
                set_config_dir_callcount += 1

            def Introspect(self, timeout=0):
                return ""

        with patch.object(DAEMON, "get_proxy") as get_mock:
            get_mock.return_value = FakeConnection()
            self.assertIsInstance(Daemon.connect(), FakeConnection)
            self.assertEqual(set_config_dir_callcount, 1)

            self.assertIsInstance(Daemon.connect(False), FakeConnection)
            self.assertEqual(set_config_dir_callcount, 2)

    def test_daemon(self):
        # remove the existing system mapping to force our own into it
        if os.path.exists(PathUtils.get_config_path("xmodmap.json")):
            os.remove(PathUtils.get_config_path("xmodmap.json"))

        preset_name = "foo"

        group = groups.find(name="gamepad")

        # unrelated group that shouldn't be affected at all
        group2 = groups.find(name="Bar Device")

        preset = Preset(group.get_preset_path(preset_name))
        preset.add(
            Mapping.from_combination(
                input_combination=InputCombination(
                    [InputConfig(type=EV_KEY, code=BTN_A)]
                ),
                target_uinput="keyboard",
                output_symbol="a",
            )
        )
        preset.add(
            Mapping.from_combination(
                input_combination=InputCombination(
                    [InputConfig(type=EV_ABS, code=ABS_X, analog_threshold=-1)]
                ),
                target_uinput="keyboard",
                output_symbol="b",
            )
        )
        preset.save()
        self.global_config.set_autoload_preset(group.key, preset_name)

        """Injection 1"""

        # should forward the event unchanged
        push_events(
            fixtures.gamepad,
            [InputEvent.key(BTN_B, 1, fixtures.gamepad.get_device_hash())],
        )

        self.daemon = Daemon(
            self.global_config,
            self.global_uinputs,
            self.mapping_parser,
        )

        self.assertFalse(uinput_write_history_pipe[0].poll())

        # has been cleanedUp in setUp
        self.assertNotIn("keyboard", self.global_uinputs.devices)

        logger.info(f"start injector for {group.key}")
        self.daemon.start_injecting(group.key, preset_name)

        # created on demand
        self.assertIn("keyboard", self.global_uinputs.devices)
        self.assertNotIn("gamepad", self.global_uinputs.devices)

        self.assertEqual(self.daemon.get_state(group.key), InjectorState.STARTING)
        self.assertEqual(self.daemon.get_state(group2.key), InjectorState.UNKNOWN)

        event = uinput_write_history_pipe[0].recv()
        self.assertEqual(self.daemon.get_state(group.key), InjectorState.RUNNING)
        self.assertEqual(event.type, EV_KEY)
        self.assertEqual(event.code, BTN_B)
        self.assertEqual(event.value, 1)

        logger.info(f"stopping injector for {group.key}")
        self.daemon.stop_injecting(group.key)
        time.sleep(0.2)
        self.assertEqual(self.daemon.get_state(group.key), InjectorState.STOPPED)

        try:
            self.assertFalse(uinput_write_history_pipe[0].poll())
        except AssertionError:
            print("Unexpected", uinput_write_history_pipe[0].recv())
            # possibly a duplicate write!
            raise

        """Injection 2"""
        logger.info(f"start injector for {group.key}")
        self.daemon.start_injecting(group.key, preset_name)

        time.sleep(0.1)
        # -1234 will be classified as -1 by the injector
        push_events(
            fixtures.gamepad,
            [InputEvent.abs(ABS_X, -1234, fixtures.gamepad.get_device_hash())],
        )
        time.sleep(0.1)

        self.assertTrue(uinput_write_history_pipe[0].poll())

        # the written key is a key-down event, not the original
        # event value of -1234
        event = uinput_write_history_pipe[0].recv()

        self.assertEqual(event.type, EV_KEY)
        self.assertEqual(event.code, KEY_B)
        self.assertEqual(event.value, 1)

    def test_refresh_on_start(self):
        if os.path.exists(PathUtils.get_config_path("xmodmap.json")):
            os.remove(PathUtils.get_config_path("xmodmap.json"))

        preset_name = "foo"
        key_code = 9
        group_name = "9876 name"

        # expected key of the group
        group_key = group_name

        group = groups.find(name=group_name)
        # this test only makes sense if this device is unknown yet
        self.assertIsNone(group)

        keyboard_layout.clear()
        keyboard_layout._set("a", KEY_A)

        preset = Preset(PathUtils.get_preset_path(group_name, preset_name))
        preset.add(
            Mapping.from_combination(
                InputCombination([InputConfig(type=EV_KEY, code=key_code)]),
                "keyboard",
                "a",
            )
        )

        # make the daemon load the file instead
        with open(PathUtils.get_config_path("xmodmap.json"), "w") as file:
            json.dump(keyboard_layout._mapping, file, indent=4)
        keyboard_layout.clear()

        preset.save()
        self.global_config.set_autoload_preset(group_key, preset_name)
        self.daemon = Daemon(
            self.global_config,
            self.global_uinputs,
            self.mapping_parser,
        )

        # make sure the devices are populated
        groups.refresh()

        # the daemon is supposed to find this device by calling refresh
        fixture = Fixture(
            capabilities={evdev.ecodes.EV_KEY: [key_code]},
            phys="9876 phys",
            info=evdev.device.DeviceInfo(4, 5, 6, 7),
            name=group_name,
            path=self.new_fixture_path,
        )
        fixtures[self.new_fixture_path] = fixture
        push_events(fixture, [InputEvent.key(key_code, 1, fixture.get_device_hash())])
        self.daemon.start_injecting(group_key, preset_name)

        # test if the injector called groups.refresh successfully
        group = groups.find(key=group_key)
        self.assertEqual(group.name, group_name)
        self.assertEqual(group.key, group_key)

        time.sleep(0.1)
        self.assertTrue(uinput_write_history_pipe[0].poll())

        event = uinput_write_history_pipe[0].recv()
        self.assertEqual(event, (EV_KEY, KEY_A, 1))

        self.daemon.stop_injecting(group_key)
        time.sleep(0.2)
        self.assertEqual(self.daemon.get_state(group_key), InjectorState.STOPPED)

    def test_refresh_for_unknown_key(self):
        device_9876 = "9876 name"
        # this test only makes sense if this device is unknown yet
        self.assertIsNone(groups.find(name=device_9876))

        self.daemon = Daemon(
            self.global_config,
            self.global_uinputs,
            self.mapping_parser,
        )

        # make sure the devices are populated
        groups.refresh()

        self.daemon.refresh()

        fixtures[self.new_fixture_path] = Fixture(
            capabilities={evdev.ecodes.EV_KEY: [evdev.ecodes.KEY_A]},
            phys="9876 phys",
            info=evdev.device.DeviceInfo(4, 5, 6, 7),
            name=device_9876,
            path=self.new_fixture_path,
        )

        self.daemon._autoload("25v7j9q4vtj")
        # this is unknown, so the daemon will scan the devices again

        # test if the injector called groups.refresh successfully
        self.assertIsNotNone(groups.find(name=device_9876))

    def test_xmodmap_file(self):
        """Create a custom xmodmap file, expect the daemon to read keycodes from it."""
        from_keycode = evdev.ecodes.KEY_A
        target = "keyboard"
        to_name = "q"
        to_keycode = 100

        name = "Bar Device"
        preset_name = "foo"
        group = groups.find(name=name)

        config_dir = os.path.join(tmp, "foo")

        path = os.path.join(config_dir, "presets", name, f"{preset_name}.json")

        preset = Preset(path)
        preset.add(
            Mapping.from_combination(
                InputCombination([InputConfig(type=EV_KEY, code=from_keycode)]),
                target,
                to_name,
            )
        )
        preset.save()

        keyboard_layout.clear()

        push_events(
            fixtures.bar_device,
            [
                InputEvent.key(
                    from_keycode,
                    1,
                    origin_hash=fixtures.bar_device.get_device_hash(),
                )
            ],
        )

        # an existing config file is needed otherwise set_config_dir refuses
        # to use the directory
        config_path = os.path.join(config_dir, "config.json")
        self.global_config.path = config_path
        self.global_config._save_config()

        # finally, create the xmodmap file
        xmodmap_path = os.path.join(config_dir, "xmodmap.json")
        with open(xmodmap_path, "w") as file:
            file.write(f'{{"{to_name}":{to_keycode}}}')

        # test setup complete

        self.daemon = Daemon(
            self.global_config,
            self.global_uinputs,
            self.mapping_parser,
        )
        self.daemon.set_config_dir(config_dir)

        self.daemon.start_injecting(group.key, preset_name)

        time.sleep(0.1)
        self.assertTrue(uinput_write_history_pipe[0].poll())

        event = uinput_write_history_pipe[0].recv()
        self.assertEqual(event.type, EV_KEY)
        self.assertEqual(event.code, to_keycode)
        self.assertEqual(event.value, 1)

    def test_start_stop(self):
        group_key = "Qux/[Device]?"
        group = groups.find(key=group_key)
        preset_name = "preset8"

        daemon = Daemon(
            self.global_config,
            self.global_uinputs,
            self.mapping_parser,
        )
        self.daemon = daemon

        pereset = Preset(group.get_preset_path(preset_name))
        pereset.add(
            Mapping.from_combination(
                InputCombination([InputConfig(type=EV_KEY, code=KEY_A)]),
                "keyboard",
                "a",
            )
        )
        pereset.save()

        # start
        daemon.start_injecting(group_key, preset_name)
        # explicit start, not autoload, so the history stays empty
        self.assertNotIn(group_key, daemon.autoload_history._autoload_history)
        self.assertTrue(daemon.autoload_history.may_autoload(group_key, preset_name))
        # path got translated to the device name
        self.assertIn(group_key, daemon.injectors)

        # start again
        previous_injector = daemon.injectors[group_key]
        self.assertNotEqual(previous_injector.get_state(), InjectorState.STOPPED)
        daemon.start_injecting(group_key, preset_name)
        self.assertNotIn(group_key, daemon.autoload_history._autoload_history)
        self.assertTrue(daemon.autoload_history.may_autoload(group_key, preset_name))
        self.assertIn(group_key, daemon.injectors)
        time.sleep(0.2)
        self.assertEqual(previous_injector.get_state(), InjectorState.STOPPED)
        # a different injetor is now running
        self.assertNotEqual(previous_injector, daemon.injectors[group_key])
        self.assertNotEqual(
            daemon.injectors[group_key].get_state(), InjectorState.STOPPED
        )

        # trying to inject a non existing preset keeps the previous inejction
        # alive
        injector = daemon.injectors[group_key]
        daemon.start_injecting(group_key, "qux")
        self.assertEqual(injector, daemon.injectors[group_key])
        self.assertNotEqual(
            daemon.injectors[group_key].get_state(), InjectorState.STOPPED
        )

        # trying to start injecting for an unknown device also just does
        # nothing
        daemon.start_injecting("quux", "qux")
        self.assertNotEqual(
            daemon.injectors[group_key].get_state(), InjectorState.STOPPED
        )

        # after all that stuff autoload_history is still unharmed
        self.assertNotIn(group_key, daemon.autoload_history._autoload_history)
        self.assertTrue(daemon.autoload_history.may_autoload(group_key, preset_name))

        # stop
        daemon.stop_injecting(group_key)
        time.sleep(0.2)
        self.assertNotIn(group_key, daemon.autoload_history._autoload_history)
        self.assertEqual(daemon.injectors[group_key].get_state(), InjectorState.STOPPED)
        self.assertTrue(daemon.autoload_history.may_autoload(group_key, preset_name))

    def test_autoload(self):
        preset_name = "preset7"
        group_key = "Qux/[Device]?"
        group = groups.find(key=group_key)

        daemon = Daemon(self.global_config, self.global_uinputs, self.mapping_parser)
        self.daemon = daemon

        preset = Preset(group.get_preset_path(preset_name))
        preset.add(
            Mapping.from_combination(
                InputCombination([InputConfig(type=EV_KEY, code=KEY_A)]),
                "keyboard",
                "a",
            )
        )
        preset.save()

        # no autoloading is configured yet
        self.daemon._autoload(group_key)
        self.assertNotIn(group_key, daemon.autoload_history._autoload_history)
        self.assertTrue(daemon.autoload_history.may_autoload(group_key, preset_name))

        self.global_config.set_autoload_preset(group_key, preset_name)
        len_before = len(self.daemon.autoload_history._autoload_history)
        # now autoloading is configured, so it will autoload
        self.daemon._autoload(group_key)
        len_after = len(self.daemon.autoload_history._autoload_history)
        self.assertEqual(
            daemon.autoload_history._autoload_history[group_key][1],
            preset_name,
        )
        self.assertFalse(daemon.autoload_history.may_autoload(group_key, preset_name))
        injector = daemon.injectors[group_key]
        self.assertEqual(len_before + 1, len_after)

        # calling duplicate get_autoload does nothing
        self.daemon._autoload(group_key)
        self.assertEqual(
            daemon.autoload_history._autoload_history[group_key][1],
            preset_name,
        )
        self.assertEqual(injector, daemon.injectors[group_key])
        self.assertFalse(daemon.autoload_history.may_autoload(group_key, preset_name))

        # explicit start_injecting clears the autoload history
        self.daemon.start_injecting(group_key, preset_name)
        self.assertTrue(daemon.autoload_history.may_autoload(group_key, preset_name))

        # calling autoload for (yet) unknown devices does nothing
        len_before = len(self.daemon.autoload_history._autoload_history)
        self.daemon._autoload("unknown-key-1234")
        len_after = len(self.daemon.autoload_history._autoload_history)
        self.assertEqual(len_before, len_after)

        # autoloading input-remapper devices does nothing
        len_before = len(self.daemon.autoload_history._autoload_history)
        self.daemon.autoload_single("Bar Device")
        len_after = len(self.daemon.autoload_history._autoload_history)
        self.assertEqual(len_before, len_after)

    def test_autoload_2(self):
        self.daemon = Daemon(
            self.global_config,
            self.global_uinputs,
            self.mapping_parser,
        )
        history = self.daemon.autoload_history._autoload_history

        # existing device
        preset_name = "preset7"
        group = groups.find(key="Foo Device 2")
        preset = Preset(group.get_preset_path(preset_name))
        preset.add(
            Mapping.from_combination(
                InputCombination([InputConfig(type=3, code=2, analog_threshold=1)]),
                "keyboard",
                "a",
            )
        )
        preset.save()
        self.global_config.set_autoload_preset(group.key, preset_name)

        # ignored, won't cause problems:
        self.global_config.set_autoload_preset("non-existant-key", "foo")

        self.daemon.autoload()
        self.assertEqual(len(history), 1)
        self.assertEqual(history[group.key][1], preset_name)

    def test_autoload_3(self):
        # based on a bug
        preset_name = "preset7"
        group = groups.find(key="Foo Device 2")

        preset = Preset(group.get_preset_path(preset_name))
        preset.add(
            Mapping.from_combination(
                InputCombination([InputConfig(type=3, code=2, analog_threshold=1)]),
                "keyboard",
                "a",
            )
        )
        preset.save()

        self.global_config.set_autoload_preset(group.key, preset_name)

        self.daemon = Daemon(
            self.global_config,
            self.global_uinputs,
            self.mapping_parser,
        )
        groups.set_groups([])  # caused the bug
        self.assertIsNone(groups.find(key="Foo Device 2"))
        self.daemon.autoload()

        # it should try to refresh the groups because all the
        # group_keys are unknown at the moment
        history = self.daemon.autoload_history._autoload_history
        self.assertEqual(history[group.key][1], preset_name)
        self.assertEqual(self.daemon.get_state(group.key), InjectorState.STARTING)
        self.assertIsNotNone(groups.find(key="Foo Device 2"))


if __name__ == "__main__":
    unittest.main()
